{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Taxi Fare Prediction Using Realtime Traffic Data\n",
    "\n",
    "This will be the same as our previous taxi fare model, but with the addition of ‘trips_last_5min’ data as a feature. This is our proxy for traffic"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "import numpy as np\n",
    "import shutil\n",
    "print(tf.__version__)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load raw data\n",
    "\n",
    "These are the same files used previously for taxi fare prediction, however an additional field `trips_last_5min` has been added"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!gcloud storage cp gs://cloud-training-demos/taxifare/traffic/small/*.csv .\n",
    "!ls -l *.csv"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and Evaluate input functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "CSV_COLUMN_NAMES = [\"fare_amount\",\"dayofweek\",\"hourofday\",\"pickuplon\",\"pickuplat\",\\\n",
    "                    \"dropofflon\",\"dropofflat\",\"trips_last_5min\"]\n",
    "CSV_DEFAULTS = [[0.0],[1],[0],[-74.0],[40.0],[-74.0],[40.7],[0]]\n",
    "\n",
    "def read_dataset(csv_path):\n",
    "    def _parse_row(row):\n",
    "        # Decode the CSV row into list of TF tensors\n",
    "        fields = tf.decode_csv(records = row, record_defaults = CSV_DEFAULTS)\n",
    "\n",
    "        # Pack the result into a dictionary\n",
    "        features = dict(zip(CSV_COLUMN_NAMES, fields))\n",
    "        \n",
    "        # NEW: Add engineered features\n",
    "        features = add_engineered_features(features)\n",
    "        \n",
    "        # Separate the label from the features\n",
    "        label = features.pop(\"fare_amount\") # remove label from features and store\n",
    "\n",
    "        return features, label\n",
    "    \n",
    "    # Create a dataset containing the text lines.\n",
    "    dataset = tf.data.Dataset.list_files(file_pattern = csv_path) # (i.e. data_file_*.csv)\n",
    "    dataset = dataset.flat_map(map_func = lambda filename:tf.data.TextLineDataset(filenames = filename).skip(count = 1))\n",
    "\n",
    "    # Parse each CSV row into correct (features,label) format for Estimator API\n",
    "    dataset = dataset.map(map_func = _parse_row)\n",
    "    \n",
    "    return dataset\n",
    "\n",
    "def train_input_fn(csv_path, batch_size = 128):\n",
    "    #1. Convert CSV into tf.data.Dataset with (features,label) format\n",
    "    dataset = read_dataset(csv_path)\n",
    "      \n",
    "    #2. Shuffle, repeat, and batch the examples.\n",
    "    dataset = dataset.shuffle(buffer_size = 1000).repeat(count = None).batch(batch_size = batch_size)\n",
    "   \n",
    "    return dataset\n",
    "\n",
    "def eval_input_fn(csv_path, batch_size = 128):\n",
    "    #1. Convert CSV into tf.data.Dataset with (features,label) format\n",
    "    dataset = read_dataset(csv_path)\n",
    "\n",
    "    #2.Batch the examples.\n",
    "    dataset = dataset.batch(batch_size = batch_size)\n",
    "   \n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Feature Engineering: feature columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# 1. One hot encode dayofweek and hourofday\n",
    "fc_dayofweek = tf.feature_column.categorical_column_with_identity(key = \"dayofweek\", num_buckets = 7)\n",
    "fc_hourofday = tf.feature_column.categorical_column_with_identity(key = \"hourofday\", num_buckets = 24)\n",
    "\n",
    "# 2. Bucketize latitudes and longitudes\n",
    "NBUCKETS = 16\n",
    "latbuckets = np.linspace(start = 38.0, stop = 42.0, num = NBUCKETS).tolist()\n",
    "lonbuckets = np.linspace(start = -76.0, stop = -72.0, num = NBUCKETS).tolist()\n",
    "def bucketize_fc(key,boundaries):\n",
    "    return tf.feature_column.bucketized_column(\n",
    "        source_column = tf.feature_column.numeric_column(key = key), \n",
    "        boundaries = boundaries)\n",
    "fc_bucketized_plat = bucketize_fc(\"pickuplon\",lonbuckets)\n",
    "fc_bucketized_plon = bucketize_fc(\"pickuplat\",latbuckets)\n",
    "fc_bucketized_dlat = bucketize_fc(\"dropofflon\",lonbuckets)\n",
    "fc_bucketized_dlon = bucketize_fc(\"dropofflat\",latbuckets)\n",
    "\n",
    "# 3. Cross features to get combination of day and hour\n",
    "fc_crossed_day_hr = tf.feature_column.crossed_column(keys = [fc_dayofweek, fc_hourofday], hash_bucket_size = 24 * 7)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Feature Engineering: input function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def add_engineered_features(features):\n",
    "    features[\"dayofweek\"] = features[\"dayofweek\"] - 1 # subtract one since our days of week are 1-7 instead of 0-6\n",
    "    \n",
    "    features[\"latdiff\"] = features[\"pickuplat\"] - features[\"dropofflat\"] # East/West\n",
    "    features[\"londiff\"] = features[\"pickuplon\"] - features[\"dropofflon\"] # North/South\n",
    "    features[\"euclidean_dist\"] = tf.sqrt(x = features[\"latdiff\"]**2 + features[\"londiff\"]**2)\n",
    "\n",
    "    return features"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Gather list of feature columns\n",
    "\n",
    "Note we're standard normalizing our traffic proxy. The mean and standard deviation were calculated on the full dataset in BigQuery, then hardcoded here.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "feature_cols = [\n",
    "    #1. Engineered using tf.feature_column module\n",
    "    tf.feature_column.indicator_column(categorical_column = fc_crossed_day_hr),\n",
    "    fc_bucketized_plat,\n",
    "    fc_bucketized_plon,\n",
    "    fc_bucketized_dlat,\n",
    "    fc_bucketized_dlon,\n",
    "    #2. Engineered in input functions\n",
    "    tf.feature_column.numeric_column(key = \"latdiff\"),\n",
    "    tf.feature_column.numeric_column(key = \"londiff\"),\n",
    "    tf.feature_column.numeric_column(key = \"euclidean_dist\"),\n",
    "    #3. Traffic proxy\n",
    "    tf.feature_column.numeric_column(key = \"trips_last_5min\",\n",
    "                                     normalizer_fn=lambda x: (x - 2070) / 616)\n",
    "]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Serving Input Receiver function "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def serving_input_receiver_fn():\n",
    "    receiver_tensors = {\n",
    "        'dayofweek' : tf.placeholder(dtype = tf.int32, shape = [None]), # shape is vector to allow batch of requests\n",
    "        'hourofday' : tf.placeholder(dtype = tf.int32, shape = [None]),\n",
    "        'pickuplon' : tf.placeholder(dtype = tf.float32, shape = [None]), \n",
    "        'pickuplat' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        'dropofflat' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        'dropofflon' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "        'trips_last_5min' : tf.placeholder(dtype = tf.float32, shape = [None]),\n",
    "    }\n",
    "    \n",
    "    features = add_engineered_features(receiver_tensors) # 'features' is what is passed on to the model\n",
    "    \n",
    "    return tf.estimator.export.ServingInputReceiver(features = features, receiver_tensors = receiver_tensors)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and Evaluate"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "OUTDIR = \"taxi_trained\"\n",
    "shutil.rmtree(path = OUTDIR, ignore_errors = True) # start fresh each time\n",
    "tf.summary.FileWriterCache.clear() # ensure filewriter cache is clear for TensorBoard events file\n",
    "tf.logging.set_verbosity(v = tf.logging.INFO) # so loss is printed during training\n",
    "\n",
    "model = tf.estimator.DNNRegressor(\n",
    "    hidden_units = [10,10], # specify neural architecture\n",
    "    feature_columns = feature_cols, \n",
    "    model_dir = OUTDIR,\n",
    "    config = tf.estimator.RunConfig(\n",
    "        tf_random_seed = 1, # for reproducibility\n",
    "        save_checkpoints_steps = 200 # checkpoint every N steps\n",
    "    ) \n",
    ")\n",
    "\n",
    "# Add custom evaluation metric\n",
    "def my_rmse(labels, predictions):\n",
    "    pred_values = tf.squeeze(input = predictions[\"predictions\"], axis = -1)\n",
    "    return {\"rmse\": tf.metrics.root_mean_squared_error(labels = labels, predictions = pred_values)}\n",
    "\n",
    "model = tf.contrib.estimator.add_metrics(estimator = model, metric_fn = my_rmse) \n",
    "    \n",
    "train_spec = tf.estimator.TrainSpec(\n",
    "    input_fn = lambda: train_input_fn(\"./taxi-train.csv\"),\n",
    "    max_steps = 5000)\n",
    "\n",
    "exporter = tf.estimator.FinalExporter(name = \"exporter\", serving_input_receiver_fn = serving_input_receiver_fn) # export SavedModel once at the end of training\n",
    "# Note: alternatively use tf.estimator.BestExporter to export at every checkpoint that has lower loss than the previous checkpoint\n",
    "\n",
    "eval_spec = tf.estimator.EvalSpec(\n",
    "    input_fn = lambda: eval_input_fn(\"./taxi-valid.csv\"),\n",
    "    steps = None,\n",
    "    start_delay_secs = 1, # wait at least N seconds before first evaluation (default 120)\n",
    "    throttle_secs = 1, # wait at least N seconds before each subsequent evaluation (default 600)\n",
    "    exporters = exporter) # export SavedModel once at the end of training\n",
    "\n",
    "tf.estimator.train_and_evaluate(estimator = model, train_spec = train_spec, eval_spec = eval_spec)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Results\n",
    "\n",
    "For me, adding the traffic information reduced the RMSE from 4.17 (without the feature) to 4.14. It looks like it helped, but the lift isn't dramatic. Perhaps this is because information about traffic is already mostly captured by the day_hr feature cross."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Copyright 2019 Google Inc. Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
